﻿using log4net;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Wq.Core.Common;
using Wq.Core.Common.Helper;

namespace Wq.Core.Extensions.Assembly.RabbitMQ
{
    public class RabbitListener
    {
        private static readonly ILog log = LogManager.GetLogger(typeof(RabbitListener));
        public IConnection connection;
        public IModel channel;
     
        public string exchangeName = Appsettings.app("Rabbit:Exchange"); //交换器名称
        public string delayExchangeName = Appsettings.app("Rabbit:Exchange")+ "Delay"; //延迟交换器名称
        string exchangeType = "direct";//交换器类型
        string queueName = "";//队列名称
        string routingKey = "";//路由key
        bool Delay = false;// 是否延迟
        public RabbitListener()
        {
            foreach (Attribute attr in this.GetType().GetCustomAttributes(typeof(RabbitAttribute), true))
            {
                if (attr.GetType() == typeof(RabbitAttribute))
                {
                    var model = (RabbitAttribute)attr;
                    queueName = $"{exchangeName}.{model.Name}.{Delay.ToString()}";  
                    routingKey = model.RoutingKey;
                    Delay = model.Delay;
                }
            }

            if (connection == null)
            {
                try
                {
                    var factory = new ConnectionFactory()
                    {
                        HostName = Appsettings.app("Rabbit:HostName"),
                        UserName = Appsettings.app("Rabbit:UserName"),
                        Password = Appsettings.app("Rabbit:Password"),
                        Port = AmqpTcpEndpoint.UseDefaultPort
                    };
                    this.connection = factory.CreateConnection();
                    channel = connection.CreateModel();
                    if (!Delay)
                    {
                        channel.ExchangeDeclare(exchangeName, exchangeType);
                        channel.QueueDeclare(queueName, true, false, false, null);
                        channel.QueueBind(queueName, exchangeName, routingKey);

                    }  
                    else
                    {
                        Dictionary<string, object> exchangeArgs = new Dictionary<string, object>()
                        {
                            {"x-delayed-type","direct" }
                        };
                        channel.ExchangeDeclare(delayExchangeName, "x-delayed-message", true, false, exchangeArgs);
                        channel.QueueDeclare(queueName, true, false, false, null);
                        channel.QueueBind(queueName, delayExchangeName, routingKey);
                    }
                      


                  
                }
                catch (Exception ex)
                {
                    log.Error(ex);
                }
            }
        }
        /// <summary>
        /// 生产者
        /// </summary>
        /// <param name="message"></param>
        public void Publish(string message,long DelayTimes =0)
        {
            var properties = channel.CreateBasicProperties();
            if (!Delay)
            {
                properties.Persistent = true;
                var msg = Encoding.UTF8.GetBytes(message);
                channel.BasicPublish(exchangeName, routingKey, properties, msg);

            }
            else
            {
                Dictionary<string, object> headers = new Dictionary<string, object>()
                {
                    {"x-delay",DelayTimes }
                };
                properties.Headers = headers;
                properties.Persistent = true;
                var msg = Encoding.UTF8.GetBytes(message);
                channel.BasicPublish(delayExchangeName, routingKey, properties, msg);
            }
          
           
        }

        /// <summary>
        /// 消费者 每个实现类必须实现Handle 方法
        /// </summary>
        public void Subscribe()
        {
            var type = this.GetType();
            var Handle = type.GetMethod("Handler");
            object obj = Activator.CreateInstance(type);
            channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
            var consumer = new EventingBasicConsumer(channel);
            channel.BasicConsume(queueName, false, consumer);
            consumer.Received += (model, ea) => {
                var msgBody = Encoding.UTF8.GetString(ea.Body.Span);
                Handle.Invoke(obj, new object[] { msgBody });
                channel.BasicAck(ea.DeliveryTag, false);
            };
           
        }
    }
}
